package com.omuao.message.websocket.manager.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.omuao.message.websocket.config.RocksDbConfig;
import com.omuao.message.websocket.facade.ClientSessionManager;
import com.omuao.message.websocket.facade.MqttMessageListener;
import com.omuao.message.websocket.facade.QueueDataManager;
import com.omuao.message.websocket.facade.WebSocketMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.io.IOException;

/**
 * 默认队列数据管理器
 *
 * @author omuao
 */
@Repository
public class DefaultQueueDataManagerImpl implements QueueDataManager {

    /**
     * 数据
     */
    @Autowired
    @Qualifier(RocksDbConfig.ROCK_DB_WRITE)
    RocksDB db;

    /**
     * 对象映射
     */
    @Autowired
    ObjectMapper objectMapper;

    @Autowired
    @Qualifier("mqttClient")
    MqttClient mqttClient;

    @Autowired
    @Qualifier("mqttConnectOptions")
    MqttConnectOptions mqttConnectOptions;

    @Autowired
    MqttMessageListener mqttMessageListener;

    @Autowired
    ClientSessionManager clientSessionManager;

    public static Logger logger = LoggerFactory.getLogger(DefaultQueueDataManagerImpl.class);

    @Override
    public int delete(WebSocketMessage queueData) {
        String key = queueData.getKey();
        try {
            db.delete(key.getBytes());
        } catch (RocksDBException e) {
            logger.error(e.getMessage(), e);
        }
        return 1;
    }

    @Override
    public WebSocketMessage query(WebSocketMessage queueData) {
        String key = queueData.getKey();
        if (key == null) {
            return null;
        }
        byte[] bytes = null;
        try {
            bytes = db.get(key.getBytes());
        } catch (RocksDBException e) {
            logger.error(e.getMessage(), e);
        }
        if (bytes == null) {
            return null;
        }
        String content = new String(bytes);
        WebSocketMessage result = null;
        try {
            result = objectMapper.readValue(content, WebSocketMessage.class);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return result;
    }

    @Override
    public int insert(WebSocketMessage queueData) {
        String key = queueData.getKey();
        String content = null;
        try {
            content = objectMapper.writeValueAsString(queueData);
        } catch (JsonProcessingException e) {
            logger.error(e.getMessage(), e);
        }
        if (content == null) {
            return -1;
        }
        try {
            db.put(key.getBytes(), content.getBytes());
        } catch (RocksDBException e) {
            logger.error(e.getMessage(), e);
        }
        return 1;
    }

    @Override
    public int update(WebSocketMessage queueData) {
        String key = queueData.getKey();
        String content = null;
        try {
            content = objectMapper.writeValueAsString(queueData);
        } catch (JsonProcessingException e) {
            logger.error(e.getMessage(), e);
        }
        if (content == null) {
            return -1;
        }
        try {
            db.put(key.getBytes(), content.getBytes());
        } catch (RocksDBException e) {
            logger.error(e.getMessage(), e);
        }
        return 1;
    }

    @Override
    public void compactRange() {
        try {
            db.compactRange();
        } catch (RocksDBException e) {
            logger.error(e.getMessage(), e);
        }
    }

    @Override
    public void sendOfflineMessage() {
        RocksIterator iterator = db.newIterator();
        iterator.seekToFirst();
        for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
            byte[] message = iterator.value();

            WebSocketMessage result = null;
            try {
                result = objectMapper.readValue(message, WebSocketMessage.class);
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }

            if (!clientSessionManager.contains(result.getReceiverId())) {
                continue;
            }
            boolean flag = mqttMessageListener.sendMessage(result);

            if (flag) {
                try {
                    db.delete(result.getKey().getBytes());
                } catch (RocksDBException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
        iterator.close();
        iterator = null;
    }
}
